package com.ycl.javacore.kafka;

import org.apache.commons.collections4.IteratorUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;

/**
 * User: OF1089 杨成龙
 * Date: 2020/2/7
 * Time: 3:04 下午
 * Desc: 类描述
 */
public class Consumer {

    private HashMap<String, Object> props = new HashMap<>();

    public Consumer() {
        props.put("bootstrap.servers", "172.21.35.63:9092");
        props.put("group.id", "gp1");
        props.put("enable.auto.commit", true);
    }

    public void consume1() throws Exception {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(props, new StringDeserializer(), new StringDeserializer());
        kafkaConsumer.subscribe(Collections.singleton("ycltest"));
        while (true) {
            ConsumerRecords records = kafkaConsumer.poll(100);
            List<ConsumerRecord> recordList = IteratorUtils.toList(records.iterator());
            for (ConsumerRecord record : recordList) {
                System.out.println(this.toString() + "-consume1-" + record.value());
                throw new Exception("111");
            }
        }
    }

    public void consume2() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(props, new StringDeserializer(), new StringDeserializer());
        kafkaConsumer.subscribe(Collections.singleton("ycltest"));
        while (true) {
            ConsumerRecords records = kafkaConsumer.poll(100);
            List<ConsumerRecord> recordList = IteratorUtils.toList(records.iterator());
            for (ConsumerRecord record : recordList) {
                System.out.println(this.toString() + "-consume2-" + record.value());
            }
        }
    }

    public static void main(String[] args) throws Exception {

        new Thread(() -> {
            Consumer consumer1 = new Consumer();
            try {
                consumer1.consume1();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        new Thread(() -> {
            Consumer consumer2 = new Consumer();
            consumer2.consume2();
        }).start();

    }
}
